{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "16f6bb49",
   "metadata": {},
   "source": [
    "![iceberg-logo](https://www.apache.org/logos/res/iceberg/iceberg.png)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c82657e9",
   "metadata": {},
   "source": [
    "# An Introduction to the Iceberg Java API"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3ee90ad2",
   "metadata": {},
   "source": [
    "## [Part 1 - Loading a Catalog and Creating a Table](https://tabular.io/blog/java-api-part-1/)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "72e68c62",
   "metadata": {},
   "outputs": [],
   "source": [
    "import org.apache.iceberg.catalog.Catalog;\n",
    "import org.apache.hadoop.conf.Configuration;\n",
    "import org.apache.iceberg.CatalogProperties;\n",
    "import org.apache.iceberg.jdbc.JdbcCatalog;\n",
    "import org.apache.iceberg.hadoop.HadoopFileIO;\n",
    "\n",
    "Map<String, String> properties = new HashMap<>();\n",
    "\n",
    "properties.put(CatalogProperties.CATALOG_IMPL, JdbcCatalog.class.getName());\n",
    "properties.put(CatalogProperties.URI, \"jdbc:postgresql://postgres:5432/demo_catalog\");\n",
    "properties.put(JdbcCatalog.PROPERTY_PREFIX + \"user\", \"admin\");\n",
    "properties.put(JdbcCatalog.PROPERTY_PREFIX + \"password\", \"password\");\n",
    "properties.put(CatalogProperties.WAREHOUSE_LOCATION, \"/home/iceberg/warehouse\");\n",
    "properties.put(CatalogProperties.FILE_IO_IMPL, HadoopFileIO.class.getName());\n",
    "\n",
    "JdbcCatalog catalog = new JdbcCatalog();\n",
    "Configuration conf = new Configuration();\n",
    "catalog.setConf(conf);\n",
    "catalog.initialize(\"demo\", properties);\n",
    "catalog.name();"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4be615e7",
   "metadata": {},
   "outputs": [],
   "source": [
    "import org.apache.iceberg.Schema;\n",
    "import org.apache.iceberg.types.Types;\n",
    "\n",
    "Schema schema = new Schema(\n",
    "      Types.NestedField.required(1, \"level\", Types.StringType.get()),\n",
    "      Types.NestedField.required(2, \"event_time\", Types.TimestampType.withZone()),\n",
    "      Types.NestedField.required(3, \"message\", Types.StringType.get()),\n",
    "      Types.NestedField.optional(4, \"call_stack\", Types.ListType.ofRequired(5, Types.StringType.get()))\n",
    "    );\n",
    "schema"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b7299d16",
   "metadata": {},
   "outputs": [],
   "source": [
    "import org.apache.iceberg.PartitionSpec;\n",
    "\n",
    "PartitionSpec spec = PartitionSpec.builderFor(schema)\n",
    "      .hour(\"event_time\")\n",
    "      .identity(\"level\")\n",
    "      .build();\n",
    "spec"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4d900c97",
   "metadata": {},
   "outputs": [],
   "source": [
    "import org.apache.iceberg.catalog.TableIdentifier;\n",
    "import org.apache.iceberg.catalog.Namespace;\n",
    "\n",
    "Namespace nyc = Namespace.of(\"nyc\");\n",
    "TableIdentifier name = TableIdentifier.of(nyc, \"logs\");\n",
    "name"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "8a4d8a6e",
   "metadata": {},
   "outputs": [],
   "source": [
    "catalog.createTable(name, schema, spec)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7d8c46df",
   "metadata": {},
   "outputs": [],
   "source": [
    "catalog.dropTable(name)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "fe62e0a9",
   "metadata": {},
   "source": [
    "## [Part 2 - Table Scans](https://tabular.io/blog/java-api-part-2/)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c1e7aa7a",
   "metadata": {},
   "outputs": [],
   "source": [
    "catalog.createTable(name, schema, spec)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "78c95e06",
   "metadata": {},
   "outputs": [],
   "source": [
    "import org.apache.spark.sql.SparkSession;\n",
    "\n",
    "SparkSession spark = SparkSession\n",
    "  .builder()\n",
    "  .master(\"local[*]\")\n",
    "  .appName(\"Java API Demo\")\n",
    "  .config(\"spark.sql.extensions\", \"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\")\n",
    "  .config(\"spark.sql.catalog.demo\", \"org.apache.iceberg.spark.SparkCatalog\")\n",
    "  .config(\"spark.sql.catalog.demo.catalog-impl\", \"org.apache.iceberg.jdbc.JdbcCatalog\")\n",
    "  .config(\"spark.sql.catalog.demo.uri\", \"jdbc:postgresql://postgres:5432/demo_catalog\")\n",
    "  .config(\"spark.sql.catalog.demo.jdbc.user\", \"admin\")\n",
    "  .config(\"spark.sql.catalog.demo.jdbc.password\", \"password\")\n",
    "  .config(\"spark.sql.catalog.demo.io-impl\", \"org.apache.iceberg.hadoop.HadoopFileIO\")\n",
    "  .config(\"spark.sql.catalog.demo.warehouse\", \"/home/iceberg/warehouse\")\n",
    "  .config(\"spark.sql.defaultCatalog\", \"demo\")\n",
    "  .config(\"spark.eventLog.enabled\", \"true\")\n",
    "  .config(\"spark.eventLog.dir\", \"/home/iceberg/spark-events\")\n",
    "  .config(\"spark.history.fs.logDirectory\", \"/home/iceberg/spark-events\")\n",
    "  .getOrCreate();\n",
    "\n",
    "spark.sparkContext().setLogLevel(\"ERROR\");"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "0b17f820",
   "metadata": {},
   "outputs": [],
   "source": [
    "String query = \"INSERT INTO demo.nyc.logs \"\n",
    "             + \"VALUES \"\n",
    "             + \"('info', timestamp 'today', 'Just letting you know!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3')), \"\n",
    "             + \"('warning', timestamp 'today', 'You probably should not do this!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3')), \"\n",
    "             + \"('error', timestamp 'today', 'This was a fatal application error!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3'))\";\n",
    "\n",
    "spark.sql(query).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "15ca1822",
   "metadata": {},
   "outputs": [],
   "source": [
    "import org.apache.iceberg.catalog.Catalog;\n",
    "import org.apache.hadoop.conf.Configuration;\n",
    "import org.apache.iceberg.CatalogProperties;\n",
    "import org.apache.iceberg.jdbc.JdbcCatalog;\n",
    "import org.apache.iceberg.hadoop.HadoopFileIO;\n",
    "\n",
    "Map<String, String> properties = new HashMap<>();\n",
    "\n",
    "properties.put(CatalogProperties.CATALOG_IMPL, JdbcCatalog.class.getName());\n",
    "properties.put(CatalogProperties.URI, \"jdbc:postgresql://postgres:5432/demo_catalog\");\n",
    "properties.put(JdbcCatalog.PROPERTY_PREFIX + \"user\", \"admin\");\n",
    "properties.put(JdbcCatalog.PROPERTY_PREFIX + \"password\", \"password\");\n",
    "properties.put(CatalogProperties.WAREHOUSE_LOCATION, \"/home/iceberg/warehouse\");\n",
    "properties.put(CatalogProperties.FILE_IO_IMPL, HadoopFileIO.class.getName());\n",
    "\n",
    "JdbcCatalog catalog = new JdbcCatalog();\n",
    "Configuration conf = new Configuration();\n",
    "catalog.setConf(conf);\n",
    "catalog.initialize(\"demo\", properties);"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3a5cf423",
   "metadata": {},
   "outputs": [],
   "source": [
    "import org.apache.iceberg.Table;\n",
    "import org.apache.iceberg.TableScan;\n",
    "import org.apache.iceberg.catalog.Namespace;\n",
    "import org.apache.iceberg.catalog.TableIdentifier;\n",
    "\n",
    "Namespace nyc = Namespace.of(\"nyc\");\n",
    "TableIdentifier name = TableIdentifier.of(nyc, \"logs\");\n",
    "Table table = catalog.loadTable(name);"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e472d6a1",
   "metadata": {},
   "outputs": [],
   "source": [
    "import org.apache.iceberg.io.CloseableIterable;\n",
    "import org.apache.iceberg.data.Record;\n",
    "import org.apache.iceberg.data.IcebergGenerics;\n",
    "\n",
    "CloseableIterable<Record> result = IcebergGenerics.read(table).build();"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "0d32f41c",
   "metadata": {},
   "outputs": [],
   "source": [
    "for (Record r: result) {\n",
    "    System.out.println(r);\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7dffc238",
   "metadata": {},
   "outputs": [],
   "source": [
    "import org.apache.iceberg.expressions.Expressions;\n",
    "\n",
    "CloseableIterable<Record> result = IcebergGenerics.read(table)\n",
    "        .where(Expressions.equal(\"level\", \"error\"))\n",
    "        .build();"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ec2b0431",
   "metadata": {},
   "outputs": [],
   "source": [
    "import org.apache.iceberg.CombinedScanTask;\n",
    "import org.apache.iceberg.TableScan;\n",
    "\n",
    "TableScan scan = table.newScan();"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "09d13c6b",
   "metadata": {},
   "outputs": [],
   "source": [
    "import org.apache.iceberg.expressions.Expressions;\n",
    "\n",
    "TableScan filteredScan = scan.filter(Expressions.equal(\"level\", \"info\")).select(\"message\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1857c10f",
   "metadata": {},
   "outputs": [],
   "source": [
    "Iterable<CombinedScanTask> result = filteredScan.planTasks();"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ea206ec7",
   "metadata": {},
   "outputs": [],
   "source": [
    "import org.apache.iceberg.DataFile;\n",
    "\n",
    "CombinedScanTask task = result.iterator().next();\n",
    "DataFile dataFile = task.files().iterator().next().file();\n",
    "System.out.println(dataFile);"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "41e9e10f",
   "metadata": {},
   "source": [
    "## [Part 3 - Table Scans](https://tabular.io/blog/java-api-part-3/)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "81033412",
   "metadata": {},
   "outputs": [],
   "source": [
    "import org.apache.iceberg.Schema;\n",
    "import org.apache.iceberg.types.Types;\n",
    "import org.apache.iceberg.catalog.Namespace;\n",
    "import org.apache.iceberg.catalog.TableIdentifier;\n",
    "import org.apache.iceberg.PartitionSpec;\n",
    "\n",
    "Schema schema = new Schema(\n",
    "      Types.NestedField.optional(1, \"event_id\", Types.StringType.get()),\n",
    "      Types.NestedField.optional(2, \"username\", Types.StringType.get()),\n",
    "      Types.NestedField.optional(3, \"userid\", Types.IntegerType.get()),\n",
    "      Types.NestedField.optional(4, \"api_version\", Types.StringType.get()),\n",
    "      Types.NestedField.optional(5, \"command\", Types.StringType.get())\n",
    "    );\n",
    "\n",
    "Namespace webapp = Namespace.of(\"webapp\");\n",
    "TableIdentifier name = TableIdentifier.of(webapp, \"user_events\");\n",
    "catalog.createTable(name, schema, PartitionSpec.unpartitioned());"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "12c45c6b",
   "metadata": {},
   "outputs": [],
   "source": [
    "import java.util.UUID;\n",
    "import com.google.common.collect.ImmutableList;\n",
    "import com.google.common.collect.ImmutableMap;\n",
    "import org.apache.iceberg.data.GenericRecord;\n",
    "\n",
    "GenericRecord record = GenericRecord.create(schema);\n",
    "ImmutableList.Builder<GenericRecord> builder = ImmutableList.builder();\n",
    "builder.add(record.copy(ImmutableMap.of(\"event_id\", UUID.randomUUID().toString(), \"username\", \"Bruce\", \"userid\", 1, \"api_version\", \"1.0\", \"command\", \"grapple\")));\n",
    "builder.add(record.copy(ImmutableMap.of(\"event_id\", UUID.randomUUID().toString(), \"username\", \"Wayne\", \"userid\", 1, \"api_version\", \"1.0\", \"command\", \"glide\")));\n",
    "builder.add(record.copy(ImmutableMap.of(\"event_id\", UUID.randomUUID().toString(), \"username\", \"Clark\", \"userid\", 1, \"api_version\", \"2.0\", \"command\", \"fly\")));\n",
    "builder.add(record.copy(ImmutableMap.of(\"event_id\", UUID.randomUUID().toString(), \"username\", \"Kent\", \"userid\", 1, \"api_version\", \"1.0\", \"command\", \"land\")));\n",
    "ImmutableList<GenericRecord> records = builder.build();"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "83bc5319",
   "metadata": {},
   "outputs": [],
   "source": [
    "import org.apache.iceberg.Files;\n",
    "import org.apache.iceberg.io.DataWriter;\n",
    "import org.apache.iceberg.io.OutputFile;\n",
    "import org.apache.iceberg.parquet.Parquet;\n",
    "import org.apache.iceberg.data.parquet.GenericParquetWriter;\n",
    "\n",
    "File f = new File(\"/home/iceberg/warehouse/\" + UUID.randomUUID().toString());\n",
    "OutputFile file = Files.localOutput(f);\n",
    "DataWriter<GenericRecord> dataWriter =\n",
    "    Parquet.writeData(file)\n",
    "        .schema(schema)\n",
    "        .createWriterFunc(GenericParquetWriter::buildWriter)\n",
    "        .overwrite()\n",
    "        .withSpec(PartitionSpec.unpartitioned())\n",
    "        .build();\n",
    "try {\n",
    "  for (GenericRecord record : builder.build()) {\n",
    "    dataWriter.write(record);\n",
    "  }\n",
    "} finally {\n",
    "  dataWriter.close();\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "469e6af4",
   "metadata": {},
   "outputs": [],
   "source": [
    "import org.apache.iceberg.DataFile;\n",
    "\n",
    "DataFile dataFile = dataWriter.toDataFile();"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "142b6ed1",
   "metadata": {},
   "outputs": [],
   "source": [
    "import org.apache.iceberg.catalog.Namespace;\n",
    "import org.apache.iceberg.catalog.TableIdentifier;\n",
    "import org.apache.iceberg.Table;\n",
    "\n",
    "Namespace webapp = Namespace.of(\"webapp\");\n",
    "TableIdentifier name = TableIdentifier.of(webapp, \"user_events\");\n",
    "Table tbl = catalog.loadTable(name);\n",
    "tbl.newAppend().appendFile(dataFile).commit()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c61e9e79",
   "metadata": {},
   "outputs": [],
   "source": [
    "import org.apache.iceberg.io.CloseableIterable;\n",
    "import org.apache.iceberg.data.Record;\n",
    "import org.apache.iceberg.data.IcebergGenerics;\n",
    "\n",
    "CloseableIterable<Record> result = IcebergGenerics.read(tbl).build();\n",
    "for (Record r: result) {\n",
    "    System.out.println(r);\n",
    "}"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Java",
   "language": "java",
   "name": "java"
  },
  "language_info": {
   "codemirror_mode": "java",
   "file_extension": ".jshell",
   "mimetype": "text/x-java-source",
   "name": "Java",
   "pygments_lexer": "java",
   "version": "11.0.15+10-post-Debian-1deb11u1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
